package com.atguigu.watermark;

import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.MapFunction2Impl;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * @author gmd
 * @desc 迟到数据处理
 * @since 2024-11-30 07:17:03
 */
public class DealLateData {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("127.0.0.1", 7777)
                .map(new MapFunction2Impl());

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);
        SingleOutputStreamOperator<WaterSensor> streamOperator = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);

        // 侧流输出标记
        OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));

        SingleOutputStreamOperator<String> process = streamOperator.keyBy(sensor -> sensor.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 推迟2s关窗
                .sideOutputLateData(lateTag) // 关窗后的迟到数据，放入侧输出流
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();
                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );

        process.print();
        // 从主流获取侧输出流，打印
        process.getSideOutput(lateTag).printToErr("关窗后的迟到数据：");
        env.execute();
    }


    /*
     * 乱序与迟到的区别
     * 乱序：数据的顺序乱了，时间小的比时间大的晚来
     * 迟到：数据的时间戳 < 当前的watermark
     *
     * 乱序、迟到数据的处理
     * 1. watermark中指定乱序等待时间
     * 2. 如果开窗，设置窗口允许迟到
     *    - 推迟关窗时间，在关窗之前，迟到数据来了，还能被窗口计算，来一条迟到数据触发一次计算
     *    - 关窗后，迟到数据不会被计算
     * 3. 关窗后的迟到数据，放入侧输出流
     *
     * 为什么不直接watermark等待5s或者窗口允许迟到5s？
     * - watermark等待时间不会设太大，因为会影响计算延迟
     *    - 如果3s，窗口第一次触发计算和输出，13s的数据来，13-3=10s
     *    - 如果5s，窗口第一次触发计算和输出，15s的数据来，15-5=10s
     * - 窗口允许迟到，是对大部分迟到数据的处理，尽量让结果准确
     *    - 如果只设置允许迟到5s，那么就会导致频繁重新输出
     *
     * TODO 设置经验
     * 1. watermark等待时间，设置一个不算特别大的，一般是秒级，在乱序和延迟取舍
     * 2. 设置一定的窗口允许迟到，只考虑大部分的迟到数据，极端小部分迟到很久的数据，不管
     * 3. 极端小部分迟到很久的数据，放到侧输出流。获取到之后可以做各种处理
     */

}
